import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Iterations
{
    public static void main(String[] args) throws Exception
    {


    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Long> someIntegers = env.generateSequence(0, 3);


        someIntegers.print("someIntegers");



    //这里输出的是 env.generateSequence生成的数字，上面0至10 一共11个数\
/*
    3> 2
    8> 7
    7> 6
    5> 4
    6> 5
    4> 3
    1> 0
    1> 8
    2> 1
    3> 10
    2> 9
第一列是分区编号
第二列是具体数据

*/



//    把DataStream转化为IterativeStream
   IterativeStream<Long> iteration = someIntegers.iterate();

//   iteration.print("iteration_origin");

    DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>()
        {
            @Override
            public Long map(Long value) throws Exception
            {
                return value + 50 ;
            }
        });

    minusOne.print("minusOne");

    DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>()
    {
        @Override
        public boolean filter(Long value) throws Exception
        {
            return (value <= 120);
        }
    });

stillGreaterThanZero.print("stillGreaterThanZero");






        iteration.closeWith(stillGreaterThanZero);

        iteration.print("iteration_close");




        DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>()
        {
            @Override
            public boolean filter(Long value) throws Exception
            {
                return (value <= 0);
            }
        });


        env.execute();

    }
}

